package com.huan.flink.source.socket;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * flink 从socket中读取
 *
 * @author huan.fu
 * @date 2023/9/17 - 22:53
 */
public class FlinkSocketSourceApplication {

    public static void main(String[] args) throws Exception {
        //  1、创建执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、通过 socket 读取数据， 本地使用 nc -lk 9999
        environment.socketTextStream("localhost", 9999)
                // 3、将读取到一行数据进行切割、转换
                .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (line, collector) -> {
                    // 每一行以空格进行分隔
                    String[] words = line.split(" ");
                    for (String word : words) {
                        // 使用 collector 向下游发送数据
                        collector.collect(Tuple2.of(word, 1));
                    }
                })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                // 4、根据 word 进行分组 f0表示的是 Tuple2对象中的第一个字段的值
                .keyBy(tuple2 -> tuple2.f0)
                // 5、各分组内进行聚合 1表示的是 Tuple2对象中的第二个字段的值
                .sum(1)
                // 6、输出
                .print();

        environment.execute("read-from-socket");
    }
}
